www.gusucode.com > VC++ P2P下载软件源代码-源码程序 > VC++ P2P下载软件源代码-源码程序\code\client\DownloadManager.cpp

    //Download by http://www.NewXing.com
/* 
 * Copyright (C) 2001-2003 Jacek Sieka, j_s@telia.com
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 */

#include "stdinc.h"
#include "DCPlusPlus.h"

#include "DownloadManager.h"
#include "ConnectionManager.h"
#include "User.h"
#include "QueueManager.h"
#include "LogManager.h"
#include "CryptoManager.h"
#include "SFVReader.h"

DownloadManager* Singleton<DownloadManager>::instance = NULL;

static const string DOWNLOAD_AREA = "Downloads";
static const string ANTI_FRAG_EXT = ".antifrag";

Download::Download(QueueItem* qi) throw() : source(qi->getCurrent()->getPath()),
	target(qi->getTarget()), tempTarget(qi->getTempTarget()), 
	comp(NULL), bytesLeft(0), rollbackBuffer(NULL), rollbackSize(0) { 
	
	setSize(qi->getSize());
	if(qi->isSet(QueueItem::FLAG_USER_LIST))
		setFlag(Download::FLAG_USER_LIST);
	if(qi->isSet(QueueItem::FLAG_RESUME))
		setFlag(Download::FLAG_RESUME);
};

void DownloadManager::onTimerSecond(u_int32_t /*aTick*/) {
	Lock l(cs);

	Download::List tickList;
	// Tick each ongoing download
	for(Download::Iter i = downloads.begin(); i != downloads.end(); ++i) {
		if((*i)->getTotal() > 0) {
			tickList.push_back(*i);
		}
	}

	if(tickList.size() > 0)
		fire(DownloadManagerListener::TICK, tickList);
}

void DownloadManager::FileMover::moveFile(const string& source, const string& target) {
	Lock l(cs);
	files.push_back(make_pair(source, target));
	if(!active) {
		active = true;
		start();
	}
}

int DownloadManager::FileMover::run() {
	for(;;) {
		FilePair next;
		{
			Lock l(cs);
			if(files.empty()) {
				active = false;
				return 0;
			}
			next = files.back();
			files.pop_back();
		}
		try {
			File::renameFile(next.first, next.second);
		} catch(const FileException&) {
			// Too bad...
		}
	}
}

void DownloadManager::removeConnection(UserConnection::Ptr aConn, bool reuse /* = false */) {
	dcassert(aConn->getDownload() == NULL);
	aConn->removeListener(this);
	ConnectionManager::getInstance()->putDownloadConnection(aConn, reuse);
}

void DownloadManager::checkDownloads(UserConnection* aConn) {

	if( ((SETTING(DOWNLOAD_SLOTS) != 0) && getDownloads() >= SETTING(DOWNLOAD_SLOTS)) ||
		((SETTING(MAX_DOWNLOAD_SPEED) != 0 && getAverageSpeed() >= (SETTING(MAX_DOWNLOAD_SPEED)*1024)) ) ) {
		
		if(!QueueManager::getInstance()->hasDownload(aConn->getUser(), QueueItem::HIGHEST)) {
			removeConnection(aConn);
			return;
		}
	}
	
	Download* d = QueueManager::getInstance()->getDownload(aConn->getUser());
	
	if(d) {
		dcassert(aConn->getDownload() == NULL);
		d->setUserConnection(aConn);
		aConn->setDownload(d);
		aConn->setState(UserConnection::STATE_FILELENGTH);

		{
			Lock l(cs);
			downloads.push_back(d);
		}
		
		if(d->isSet(Download::FLAG_RESUME)) {
			int64_t size = File::getSize(d->getTempTarget().empty() ? d->getTarget() : d->getTempTarget());
			int rollback = SETTING(ROLLBACK);
			int cutoff = max(SETTING(ROLLBACK), SETTING(BUFFER_SIZE)*1024);

			// dcassert(d->getSize() != -1);
			if( (rollback + cutoff) > min(size, d->getSize()) ) {
				d->setPos(0);
			} else {
				d->setPos(size - rollback - cutoff);
				d->setRollbackBuffer(rollback);
				d->setFlag(Download::FLAG_ROLLBACK);
			}
		} else {
			d->setPos(0);
		}
		if(d->isSet(Download::FLAG_USER_LIST) && aConn->isSet(UserConnection::FLAG_SUPPORTS_BZLIST)) {
			d->setSource("MyList.bz2");
		}

		if(BOOLSETTING(COMPRESS_TRANSFERS) && !d->isSet(Download::FLAG_USER_LIST) && 
			aConn->isSet(UserConnection::FLAG_SUPPORTS_GETZBLOCK)) {

			// This one, we'll download with a bzblock download instead...
			d->setFlag(Download::FLAG_ZDOWNLOAD);
			d->bytesLeft = d->getSize() - d->getPos();
			d->setComp(new ZDecompressor());
			aConn->getZBlock(d->getSource(), d->getPos(), d->bytesLeft);
		} else {
			aConn->get(d->getSource(), d->getPos());
		}
		return;
	}

	// Connection not needed any more, return it to the ConnectionManager...
	removeConnection(aConn, true);
}

void DownloadManager::onSending(UserConnection* aSource) {
	if(aSource->getState() != UserConnection::STATE_FILELENGTH) {
		dcdebug("DM::onFileLength Bad state, ignoring\n");
		return;
	}
	
	if(prepareFile(aSource)) {
		aSource->setDataMode();
	}
}

bool DownloadManager::prepareFile(UserConnection* aSource, int64_t newSize /* = -1 */) {
	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	if(newSize != -1)
		d->setSize(newSize);

	dcassert(d->getSize() != -1);

	string target = d->getTempTarget().empty() ? d->getTarget() : d->getTempTarget();
	Util::ensureDirectory(target);
	if(d->isSet(Download::FLAG_USER_LIST) && aSource->isSet(UserConnection::FLAG_SUPPORTS_BZLIST)) {
		target.replace(target.size() - 5, 5, "bz2");
	}
	File* file;
	try {
		// Let's check if we can find this file in a any .SFV...
		int trunc = d->isSet(Download::FLAG_RESUME) ? 0 : File::TRUNCATE;
		bool sfvcheck = BOOLSETTING(SFV_CHECK) && (d->getPos() == 0) && (SFVReader(d->getTarget()).hasCRC());
		
		if(BOOLSETTING(ANTI_FRAG) && !d->isSet(Download::FLAG_USER_LIST)) {
			// Anti-frag file...First, remove any old attempt that might have existed
			// and rename any partial file alread downloaded...
			string atarget = target + ANTI_FRAG_EXT;
			try {
				File::deleteFile(atarget);
				File::renameFile(target, atarget);
			} catch(const FileException& e) {
				dcdebug("AntiFrag: %s\n", e.getError().c_str());
			}
			file = new SizedFile(d->getSize(), atarget, File::RW, File::OPEN | File::CREATE | trunc, sfvcheck);

			d->setFlag(Download::FLAG_ANTI_FRAG);
		} else {
			file = new BufferedFile(target, File::RW, File::OPEN | File::CREATE | trunc, sfvcheck);			
		}

		file->setPos(d->getPos());
		
	} catch(const FileException& e) {
		fire(DownloadManagerListener::FAILED, d, STRING(COULD_NOT_OPEN_TARGET_FILE) + e.getError());
		aSource->setDownload(NULL);
		removeDownload(d);
		removeConnection(aSource);
		return false;
	}

	dcassert(d->getPos() != -1);
	d->setFile(file);
	
	if(d->getSize() <= d->getPos()) {
		aSource->setDownload(NULL);
		removeDownload(d, true);
		removeConnection(aSource);
		return false;
	} else {
		d->setStart(GET_TICK());
		aSource->setState(UserConnection::STATE_DONE);
		
		fire(DownloadManagerListener::STARTING, d);
		
	}
	
	return true;
}	

void DownloadManager::onFileLength(UserConnection* aSource, const string& aFileLength) {

	if(aSource->getState() != UserConnection::STATE_FILELENGTH) {
		dcdebug("DM::onFileLength Bad state, ignoring\n");
		return;
	}

	int64_t fileLength = Util::toInt64(aFileLength);
	if(prepareFile(aSource, fileLength)) {
		aSource->setDataMode(aSource->getDownload()->getSize() - aSource->getDownload()->getPos());
		aSource->startSend();
	}
}

bool DownloadManager::checkRollback(Download* d, const u_int8_t* aData, int aLen) throw(FileException) {
	dcassert(d->getRollbackBuffer());
	
	if(d->getTotal() + aLen >= d->getRollbackSize()) {
		AutoArray<u_int8_t> buf(d->getRollbackSize());
		int len = d->getRollbackSize() - (int)d->getTotal();
		dcassert(len > 0);
		dcassert(len <= d->getRollbackSize());
		memcpy(d->getRollbackBuffer() + d->getTotal(), aData, len);
		
		d->getFile()->read((u_int8_t*)buf, d->getRollbackSize());
		
		int cmp = memcmp(d->getRollbackBuffer(), buf, d->getRollbackSize());
		
		d->unsetFlag(Download::FLAG_ROLLBACK);
		d->setRollbackBuffer(0);
		
		if(cmp != 0) {
			return false;
		}
		if(!d->isSet(Download::FLAG_ANTI_FRAG))
			d->getFile()->setEOF();
		// Write the rest...the file pointer should have been moved to the correct position by now...
		d->getFile()->write(aData+len, aLen - len);
	} else {
		memcpy(d->getRollbackBuffer() + d->getTotal(), aData, aLen);
	}
	
	return true;
}

void DownloadManager::onData(UserConnection* aSource, const u_int8_t* aData, int aLen) {
	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	if(d->isSet(Download::FLAG_ZDOWNLOAD)) {
		// Oops, this is a bit more work...
		dcassert(d->getComp() != NULL);
		dcassert(d->bytesLeft > 0);
		int l = aLen;
		while(l > 0) {
			const u_int8_t* data = aData + (aLen - l);
			try {
				u_int32_t b = d->getComp()->decompress(data, l);
				if(!handleData(aSource, d->getComp()->getOutbuf(), b))
					break;
				d->bytesLeft -= b;
				if(d->bytesLeft == 0) {
					aSource->setLineMode();
					handleEndData(aSource);
					if(l != 0) {
						// Uhm, this client must be sending junk data after the compressed block...
						aSource->disconnect();
						return;
					}
				}
			} catch(const CryptoException&) {
				// Oops, decompression error...could happen for many reasons
				// but the most probable is that we received bad data...
				// We remove whatever we managed to download in this sessions
				// as it might be bad all of it...
				try {
					d->getFile()->movePos(-d->getTotal());
					d->getFile()->setEOF();
				} catch(const FileException&) {
					// Ignore...
				}

				fire(DownloadManagerListener::FAILED, d, STRING(DECOMPRESSION_ERROR));

				aSource->setDownload(NULL);
				removeDownload(d);
				removeConnection(aSource);
				return;
			}
		}
	} else {
		handleData(aSource, aData, aLen);
	}
}

bool DownloadManager::handleData(UserConnection* aSource, const u_int8_t* aData, int aLen) {
	dcassert(aSource->getState() == UserConnection::STATE_DONE);

	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	try {
		if(d->isSet(Download::FLAG_ROLLBACK)) {
			if(!checkRollback(d, aData, aLen)) {
				fire(DownloadManagerListener::FAILED, d, STRING(ROLLBACK_INCONSISTENCY));
				
				string target = d->getTarget();
				
				aSource->setDownload(NULL);
				removeDownload(d);				

				QueueManager::getInstance()->removeSource(target, aSource->getUser(), QueueItem::Source::FLAG_ROLLBACK_INCONSISTENCY);
				removeConnection(aSource);
				return false;
			} 
		} else {
			d->getFile()->write(aData, aLen);
		}
		d->addPos(aLen);
	} catch(const FileException& e) {
		fire(DownloadManagerListener::FAILED, d, e.getError());
		
		aSource->setDownload(NULL);
		removeDownload(d);
		removeConnection(aSource);
		return false;
	}
	return true;
}

void DownloadManager::onModeChange(UserConnection* aSource, int /*aNewMode*/) {
	handleEndData(aSource);
}

/** Download finished! */
void DownloadManager::handleEndData(UserConnection* aSource) {

	dcassert(aSource->getState() == UserConnection::STATE_DONE);
	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	// First, finish writing the file (flushing the buffers and closing the file...)
	try {
		d->getFile()->close();

		// Check if we're anti-fragging...
		if(d->isSet(Download::FLAG_ANTI_FRAG)) {
			// Ok, rename the file to what we expect it to be...
			try {
				const string& tgt = d->getTempTarget().empty() ? d->getTarget() : d->getTempTarget();
				File::renameFile(tgt + ANTI_FRAG_EXT, tgt);
				d->unsetFlag(Download::FLAG_ANTI_FRAG);
			} catch(const FileException& e) {
				dcdebug("AntiFrag: %s\n", e.getError().c_str());
				// Now what?
			}
		}
	} catch(const FileException& e) {
		fire(DownloadManagerListener::FAILED, d, e.getError());
		
		aSource->setDownload(NULL);
		removeDownload(d);
		removeConnection(aSource);
		return;
	}
	
	dcassert(d->getPos() == d->getSize());
	dcdebug("Download finished: %s, size %I64d\n", d->getTarget().c_str(), d->getSize());

	// Check if we have some crc:s...
	dcassert(d->getFile() != NULL);

	if(BOOLSETTING(SFV_CHECK)) {
		d->getFile()->close();
		SFVReader sfv(d->getTarget());
		if(sfv.hasCRC()) {
			bool crcMatch;
			const string& tgt = d->getTempTarget().empty() ? d->getTarget() : d->getTempTarget();
			if(d->getFile()->hasCRC32()) {
				crcMatch = (d->getFile()->getCRC32() == sfv.getCRC());
			} else {
				// More complicated, we have to reread the file
				try {
					
					File f(tgt, File::READ, File::OPEN, true);
					const u_int32_t BUF_SIZE = 16 * 65536;
					AutoArray<u_int8_t> b(BUF_SIZE);
					while(f.read((u_int8_t*)b, BUF_SIZE) > 0)
						;		// Keep on looping...

					crcMatch = (f.getCRC32() == sfv.getCRC());
				} catch (FileException&) {
					// Nope; read failed...
					goto noCRC;
				}
			}

			if(!crcMatch) {
				File::deleteFile(tgt);
				dcdebug("DownloadManager: CRC32 mismatch for %s\n", d->getTarget().c_str());
				fire(DownloadManagerListener::FAILED, d, STRING(SFV_INCONSISTENCY));
				
				string target = d->getTarget();
				
				aSource->setDownload(NULL);
				removeDownload(d);				
				
				QueueManager::getInstance()->removeSource(target, aSource->getUser(), QueueItem::Source::FLAG_CRC_WARN, false);
				checkDownloads(aSource);
				return;
			} 

			d->setFlag(Download::FLAG_CRC32_OK);
			
			dcdebug("DownloadManager: CRC32 match for %s\n", d->getTarget().c_str());
		}
	}
noCRC:
	delete d->getFile();
	d->setFile(NULL);
	
	if(BOOLSETTING(LOG_DOWNLOADS)) {
		StringMap params;
		params["target"] = d->getTarget();
		params["user"] = aSource->getUser()->getNick();
		params["hub"] = aSource->getUser()->getLastHubName();
		params["hubip"] = aSource->getUser()->getLastHubIp();
		params["size"] = Util::toString(d->getSize());
		params["sizeshort"] = Util::formatBytes(d->getSize());
		params["chunksize"] = Util::toString(d->getTotal());
		params["chunksizeshort"] = Util::formatBytes(d->getTotal());
		params["speed"] = Util::formatBytes(d->getAverageSpeed()) + "/s";
		params["time"] = Util::formatSeconds((GET_TICK() - d->getStart()) / 1000);
		params["sfv"] = Util::toString(d->isSet(Download::FLAG_CRC32_OK) ? 1 : 0);
		LOG(DOWNLOAD_AREA, Util::formatParams(SETTING(LOG_FORMAT_POST_DOWNLOAD), params));
	}

	// Check if we need to move the file
	if( !d->getTempTarget().empty() && (Util::stricmp(d->getTarget().c_str(), d->getTempTarget().c_str()) != 0) ) {
		try {
			Util::ensureDirectory(d->getTarget());
			if(File::getSize(d->getTempTarget()) > MOVER_LIMIT) {
				mover.moveFile(d->getTempTarget(), d->getTarget());
			} else {
				File::renameFile(d->getTempTarget(), d->getTarget());
			}
			d->setTempTarget(Util::emptyString);
		} catch(const FileException&) {
			// Huh??? Now what??? Oh well...let it be...
		}
	}
	fire(DownloadManagerListener::COMPLETE, d);
	
	aSource->setDownload(NULL);
	removeDownload(d, true);
	checkDownloads(aSource);
}

void DownloadManager::onMaxedOut(UserConnection* aSource) { 
	if(aSource->getState() != UserConnection::STATE_FILELENGTH) {
		dcdebug("DM::onMaxedOut Bad state, ignoring\n");
		return;
	}

	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	fire(DownloadManagerListener::FAILED, d, STRING(NO_SLOTS_AVAILABLE));

	aSource->setDownload(NULL);
	removeDownload(d);
	removeConnection(aSource);
}

void DownloadManager::onFailed(UserConnection* aSource, const string& aError) {
	Download* d = aSource->getDownload();

	if(d == NULL) {
		removeConnection(aSource);
		return;
	}
	
	fire(DownloadManagerListener::FAILED, d, aError);

	string target = d->getTarget();
	aSource->setDownload(NULL);
	removeDownload(d);
	
	if(aError.find("File Not Available") != string::npos) {
		QueueManager::getInstance()->removeSource(target, aSource->getUser(), QueueItem::Source::FLAG_FILE_NOT_AVAILABLE, false);
	}

	removeConnection(aSource);
}

void DownloadManager::removeDownload(Download* d, bool finished /* = false */) {
	if(d->getFile()) {
		try {
			if(d->isSet(Download::FLAG_ANTI_FRAG)) {
				// Ok, set the pos to whereever it was last writing and hope for the best...
				d->getFile()->close();
				const string& tgt = d->getTempTarget().empty() ? d->getTarget() : d->getTempTarget();
				File::renameFile(tgt + ANTI_FRAG_EXT, tgt);
				d->unsetFlag(Download::FLAG_ANTI_FRAG);
			} else {
				d->getFile()->close();
			}
			delete d->getFile();
		} catch(const FileException&) {
			finished = false;
		}

		d->setFile(NULL);
	}

	if(d->getComp()) {
		delete d->getComp();
		d->setComp(NULL);
	}

	{
		Lock l(cs);
		// Either I'm stupid or the msvc7 optimizer is doing something _very_ strange here...
		// STL-port -D_STL_DEBUG complains that .begin() and .end() don't have the same owner (!),
		// but only in release build

		dcassert(find(downloads.begin(), downloads.end(), d) != downloads.end());

		//		downloads.erase(find(downloads.begin(), downloads.end(), d));
		
		for(Download::Iter i = downloads.begin(); i != downloads.end(); ++i) {
			if(*i == d) {
				downloads.erase(i);
				break;
			}
		}
	}
	QueueManager::getInstance()->putDownload(d, finished);
}

void DownloadManager::abortDownload(const string& aTarget) {
	Lock l(cs);
	for(Download::Iter i = downloads.begin(); i != downloads.end(); ++i) {
		Download* d = *i;
		if(d->getTarget() == aTarget) {
			dcassert(d->getUserConnection() != NULL);
			d->getUserConnection()->disconnect();
			break;
		}
	}
}

void DownloadManager::onFileNotAvailable(UserConnection* aSource) throw() {
	Download* d = aSource->getDownload();
	dcassert(d != NULL);

	dcdebug("File Not Available: %s\n", d->getTarget().c_str());

	if(d->getFile()) {
		delete d->getFile();
		d->setFile(NULL);
	}

	fire(DownloadManagerListener::FAILED, d, d->getTargetFileName() + ": " + STRING(FILE_NOT_AVAILABLE));

	aSource->setDownload(NULL);

	QueueManager::getInstance()->removeSource(d->getTarget(), aSource->getUser(), QueueItem::Source::FLAG_FILE_NOT_AVAILABLE, false);
	removeDownload(d, false);
	checkDownloads(aSource);
}

// UserConnectionListener
void DownloadManager::onAction(UserConnectionListener::Types type, UserConnection* conn) throw() {
	switch(type) {
	case UserConnectionListener::MAXED_OUT: onMaxedOut(conn); break;
	case UserConnectionListener::FILE_NOT_AVAILABLE: onFileNotAvailable(conn); break;
	case UserConnectionListener::SENDING: onSending(conn); break;
	default: break;
	}
}
void DownloadManager::onAction(UserConnectionListener::Types type, UserConnection* conn, const string& line) throw() {
	switch(type) {
	case UserConnectionListener::FILE_LENGTH:
		onFileLength(conn, line); break;
	case UserConnectionListener::FAILED:
		onFailed(conn, line); break;
	default:
		break;
	}
}
void DownloadManager::onAction(UserConnectionListener::Types type, UserConnection* conn, const u_int8_t* data, int len) throw() {
	switch(type) {
	case UserConnectionListener::DATA:
		onData(conn, data, len); break;
	default:
		break;
	}
}

void DownloadManager::onAction(UserConnectionListener::Types type, UserConnection* conn, int mode) throw() {
	switch(type) {
	case UserConnectionListener::MODE_CHANGE:
		onModeChange(conn, mode); break;
	default:
		break;
	}
}

// TimerManagerListener
void DownloadManager::onAction(TimerManagerListener::Types type, u_int32_t aTick) throw() {
	switch(type) {
	case TimerManagerListener::SECOND:
		onTimerSecond(aTick); break;
	default:
		break;
	}
}

/**
 * @file
 * $Id: DownloadManager.cpp,v 1.74 2003/06/20 10:49:27 arnetheduck Exp $
 */